package com.kuding.log.components;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.DefaultLogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.kuding.log.interfaces.VinusLogHubProcessor;
import com.kuding.log.interfaces.VinusLogProcessAdpter;

public class DefaultVinusLogHubProcessor implements VinusLogHubProcessor {

	private static final Log logger = LogFactory.getLog(VinusLogHubProcessor.class);

	private int shardId;

	private final List<VinusLogProcessAdpter> processors;

	private final String group;

	public DefaultVinusLogHubProcessor(List<VinusLogProcessAdpter> processors, String group) {
		this.processors = processors;
		this.group = group;
	}

	public List<VinusLogProcessAdpter> getProcessors() {
		return processors;
	}

	@Override
	public void initialize(int shardId) {
		this.shardId = shardId;
	}

	@Override
	public String getGroup() {
		return group;
	}

	public int getShardId() {
		return shardId;
	}

	@Override
	public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
		// 这里简单的将获取到的数据打印出来。
		for (LogGroupData logGroup : logGroups) {
			FastLogGroup flg = logGroup.GetFastLogGroup();
			for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
				FastLog log = flg.getLogs(lIdx);
				logger.debug("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: "
						+ log.getContentsCount());
				for (VinusLogProcessAdpter logProcessAdpter : processors)
					logProcessAdpter.process(getContent(log));
			}
		}
		DefaultLogHubCheckPointTracker tracker = (DefaultLogHubCheckPointTracker) checkPointTracker;
		try {
			tracker.saveCheckPoint(false);
		} catch (LogHubCheckPointException e) {
			logger.error("this is not going to happen", e);
		}
		return "";
	}

	private Map<String, String> getContent(FastLog fastLog) {
		int count = fastLog.getContentsCount();
		Map<String, String> map = new HashMap<>(count);
		while (--count >= 0) {
			FastLogContent fastLogContent = fastLog.getContents(count);
			map.put(fastLogContent.getKey(), fastLogContent.getValue());
		}
		return map;
	}

	@Override
	public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
		try {
			checkPointTracker.saveCheckPoint(true);
		} catch (LogHubCheckPointException e) {
			logger.info("shutdown error! ", e);
		}
	}

}
